{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we can install some packages our notebook needs. We can also install them in our container to speed things up & make it more reliable. But for prototyping this works great!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": true
    }
   },
   "outputs": [],
   "source": [
    "!pip3 install --upgrade lxml\n",
    "!pip3 install --upgrade pandas\n",
    "!pip3 install --upgrade scikit-learn\n",
    "!pip3 install --upgrade scipy\n",
    "!pip3 install --upgrade tables"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can use Jupyter notebooks just like normal inside of Kubeflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from datetime import datetime\n",
    "from requests import get\n",
    "from lxml import etree\n",
    "from time import sleep\n",
    "\n",
    "import re\n",
    "\n",
    "import pandas as pd\n",
    "\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "container_registry = \"\" # Wherever you put your containers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def scrapeMailArchives(mailingList: str, year: int, month: int):\n",
    "    baseUrl = \"http://mail-archives.apache.org/mod_mbox/%s/%s.mbox/ajax/\" % (mailingList, datetime(year,month,1).strftime(\"%Y%m\"))\n",
    "    r = get(baseUrl + \"thread?0\")\n",
    "    utf8_parser = etree.XMLParser(encoding='utf-8')\n",
    "    root = etree.fromstring(r.text.replace('encoding=\"UTF-8\"', \"\"),  parser=utf8_parser)\n",
    "    output = []\n",
    "    for message in root.xpath(\"//message\"):\n",
    "        _id = message.get(\"id\")\n",
    "        linked = message.get(\"linked\")\n",
    "        depth = message.get(\"depth\")\n",
    "        fr = message.xpath(\"from\")[0].text\n",
    "        dt = message.xpath(\"date\")[0].text ## todo convert to date\n",
    "        subject = message.xpath(\"subject\")[0].text\n",
    "        r2 = get(baseUrl + _id)\n",
    "        bodyRoot = etree.fromstring(r2.text.replace('encoding=\"UTF-8\"', \"\"),  parser=utf8_parser)\n",
    "        body = bodyRoot.xpath(\"//contents\")[0].text\n",
    "        record = {\n",
    "            \"id\"        : _id,\n",
    "            \"linked\"    : linked,\n",
    "            \"depth\"     : depth,\n",
    "            \"from\"      : fr,\n",
    "            \"dt\"        : dt,\n",
    "            \"subject\"   : subject,\n",
    "            \"body\"      : body\n",
    "        }\n",
    "        output.append(record)\n",
    "        sleep(0.1)\n",
    "    return output\n",
    "\n",
    "\n",
    "def extract_links(body):\n",
    "    link_regex_str = r'(http(|s)://(.*?))([\\s\\n]|$)'\n",
    "    itr = re.finditer(link_regex_str, body, re.MULTILINE)\n",
    "    return list(map(lambda elem: elem.group(1), itr))\n",
    "\n",
    "def extract_domains(links):\n",
    "    from urllib.parse import urlparse\n",
    "    def extract_domain(link):\n",
    "        try:\n",
    "            nloc = urlparse(link).netloc\n",
    "            # We want to drop www and any extra spaces wtf nloc on the spaces.\n",
    "            regex_str = r'^(www\\.|)(.*?)\\s*$'\n",
    "            match = re.search(regex_str, nloc)\n",
    "            return match.group(2)\n",
    "        except:\n",
    "            return None\n",
    "    return list(map(extract_domain, links))\n",
    "\n",
    "def contains_python_stack_trace(body):\n",
    "    return \"Traceback (most recent call last)\" in body\n",
    "\n",
    "def contains_probably_java_stack_trace(body):\n",
    "    # Look for something based on regex\n",
    "    # Tried https://stackoverflow.com/questions/20609134/regular-expression-optional-multiline-java-stacktrace - more msg looking\n",
    "    # Tried https://stackoverflow.com/questions/3814327/regular-expression-to-parse-a-log-file-and-find-stacktraces\n",
    "    # Yes the compile is per call, but it's cached so w/e\n",
    "    import re\n",
    "    stack_regex_str = r'^\\s*(.+Exception.*):\\n(.*\\n){0,3}?(\\s+at\\s+.*\\(.*\\))+'\n",
    "    match = re.search(stack_regex_str, body, re.MULTILINE)\n",
    "    return match is not None\n",
    "\n",
    "def contains_exception_in_task(body):\n",
    "    # Look for a line along the lines of ERROR Executor: Exception in task\n",
    "    return \"ERROR Executor: Exception in task\" in body"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "datesToScrape =  [(2019, i) for i in range(1,13)]\n",
    "\n",
    "records = []\n",
    "for y,m in datesToScrape:\n",
    "    print(m,\"-\",y)\n",
    "    records += scrapeMailArchives(\"spark-dev\", y, m)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.DataFrame(records)\n",
    "df['links'] = df['body'].apply(extract_links)\n",
    "df['containsPythonStackTrace'] = df['body'].apply(contains_python_stack_trace)\n",
    "df['containsJavaStackTrace'] = df['body'].apply(contains_probably_java_stack_trace)\n",
    "df['containsExceptionInTaskBody'] = df['body'].apply(contains_exception_in_task)\n",
    "\n",
    "df['domains'] = df['links'].apply(extract_domains)\n",
    "df['isThreadStart'] = df['depth'] == '0'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.feature_extraction.text import TfidfVectorizer\n",
    "\n",
    "bodyV = TfidfVectorizer()\n",
    "# bodyV = TfidfVectorizer(max_features=10000) #if we cared about making this 1:1 w holden's code.\n",
    "bodyFeatures = bodyV.fit_transform(df['body'])\n",
    "\n",
    "domainV = TfidfVectorizer()\n",
    "# domainV = TfidfVectorizer(max_features=100)\n",
    "\n",
    "## A couple of \"None\" domains really screwed the pooch on this one. Also, no lists just space seperated domains.\n",
    "def makeDomainsAList(d):\n",
    "    return ' '.join([a for a in d if not a is None])\n",
    "\n",
    "domainFeatures = domainV.fit_transform(df['domains'].apply(makeDomainsAList))\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "from scipy.sparse import csr_matrix, hstack\n",
    "\n",
    "data = hstack([csr_matrix(df[['containsPythonStackTrace', 'containsJavaStackTrace', 'containsExceptionInTaskBody', 'isThreadStart']].to_numpy()),\n",
    "                             bodyFeatures,\n",
    "                            domainFeatures])\n",
    "\n",
    "\n",
    "from sklearn.cluster import KMeans\n",
    "from sklearn.model_selection import train_test_split\n",
    "\n",
    "train, test = train_test_split(data, test_size=0.1)\n",
    "\n",
    "kmeans = KMeans(n_clusters=2, random_state=42).fit(train)\n",
    "train_pred = kmeans.predict(train)\n",
    "test_pred = kmeans.predict(test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Alternatively, by structuring our code correctly we can take advantage of pipelines"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip3 install --upgrade kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp\n",
    "import kfp.dsl as dsl"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def download_data(year: int) -> str:\n",
    "    \n",
    "    from datetime import datetime\n",
    "    from lxml import etree\n",
    "    from requests import get\n",
    "    from time import sleep\n",
    "    \n",
    "    import json\n",
    "    \n",
    "    def scrapeMailArchives(mailingList: str, year: int, month: int):\n",
    "        baseUrl = \"http://mail-archives.apache.org/mod_mbox/%s/%s.mbox/ajax/\" % (mailingList, datetime(year,month,1).strftime(\"%Y%m\"))\n",
    "        r = get(baseUrl + \"thread?0\")\n",
    "        utf8_parser = etree.XMLParser(encoding='utf-8')\n",
    "        root = etree.fromstring(r.text.replace('encoding=\"UTF-8\"', \"\"),  parser=utf8_parser)\n",
    "        output = []\n",
    "        for message in root.xpath(\"//message\"):\n",
    "            _id = message.get(\"id\")\n",
    "            linked = message.get(\"linked\")\n",
    "            depth = message.get(\"depth\")\n",
    "            fr = message.xpath(\"from\")[0].text\n",
    "            dt = message.xpath(\"date\")[0].text ## todo convert to date\n",
    "            subject = message.xpath(\"subject\")[0].text\n",
    "            r2 = get(baseUrl + _id)\n",
    "            bodyRoot = etree.fromstring(r2.text.replace('encoding=\"UTF-8\"', \"\"),  parser=utf8_parser)\n",
    "            body = bodyRoot.xpath(\"//contents\")[0].text\n",
    "            record = {\n",
    "                \"id\"        : _id,\n",
    "                \"linked\"    : linked,\n",
    "                \"depth\"     : depth,\n",
    "                \"from\"      : fr,\n",
    "                \"dt\"        : dt,\n",
    "                \"subject\"   : subject,\n",
    "                \"body\"      : body\n",
    "            }\n",
    "            output.append(record)\n",
    "            sleep(0.1)\n",
    "            \n",
    "        return output\n",
    "\n",
    "    datesToScrape =  [(year, i) for i in range(1,2)]\n",
    "\n",
    "    records = []\n",
    "    ## todo, go back further\n",
    "    for y,m in datesToScrape:\n",
    "        print(m,\"-\",y)\n",
    "        records += scrapeMailArchives(\"spark-dev\", y, m)\n",
    "    import os\n",
    "    output_path = '/data_processing/data.json'\n",
    "    with open(output_path, 'w') as f:\n",
    "        json.dump(records, f)\n",
    "    \n",
    "    return output_path\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def download_tld_data() -> str:\n",
    "    from requests import get\n",
    "    import pandas as pd\n",
    "    print(\"importing io....\")\n",
    "    import io\n",
    "\n",
    "    url = \"https://pkgstore.datahub.io/core/country-list/data_csv/data/d7c9d7cfb42cb69f4422dec222dbbaa8/data_csv.csv\"\n",
    "    print(\"Getting the url\")\n",
    "    s = get(url).content\n",
    "    print(\"Converting content\")\n",
    "    df = pd.read_csv(io.StringIO(s.decode('utf-8')))\n",
    "    print(\"Writing output\")\n",
    "    output_path_hdf = '/tld_info/clean_data.hdf'\n",
    "    df.to_hdf(output_path_hdf, key=\"tld\")\n",
    "    \n",
    "    return output_path_hdf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have some data, we want to get rid of any \"bad\" records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::clean_data_fun[]\n",
    "def clean_data(input_path: str) -> str:\n",
    "    import json\n",
    "    import pandas as pd\n",
    "    \n",
    "    print(\"loading records...\")\n",
    "    with open(input_path, 'r') as f:\n",
    "        records = json.load(f)\n",
    "    print(\"records loaded\")\n",
    "    \n",
    "    df = pd.DataFrame(records)\n",
    "    # Drop records without a subject, body, or sender\n",
    "    cleaned = df.dropna(subset=[\"subject\", \"body\", \"from\"])\n",
    "    \n",
    "    output_path_hdf = '/data_processing/clean_data.hdf'\n",
    "    cleaned.to_hdf(output_path_hdf, key=\"clean\")\n",
    "    \n",
    "    return output_path_hdf\n",
    "#end::clean_data_fun[]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Preparing the data\n",
    "\n",
    "Remember earlier when we did that big (and arguably pointless) classification of emails from the Apache Spark mailing list? OK, now we're going to do it again, as a \"lightweight\" Python function in a Kubeflow Pipeline.  I hope the irony of the term \"lightweight\" isn't lost on anyone, because this is pretty blatent abuse of something that was originally presented for conveinience. \n",
    "\n",
    "First note, all of the imports and declarations of helper functions MUST be with in the \"ligthweight\" function. One could argue (and they would probably be correct) that I have two steps here- feature prep and ML, and as such I should split them. I would say that's fair, but I choose not to do so at this time.  Perhaps in some scripts later on?\n",
    "\n",
    "As has been pointed out so many times before, we assume the reader either arleady understands what is going on with the KMeans clustering, or better yet, doesn't even care. I won't be digging into that right now. What I will point out- and maybe as a note to the editor, the model that is finally saved really ought to be persisted somewhere.  If the model isn't saved, then this basically pointless pipeline, is truly pointless. \n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's make sure we can read that data in the next step (before we write a big complicated model to do whatever torture to it)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def prepare_features(input_path: str, tld_info_path: str):\n",
    "   \n",
    "    import re\n",
    "    import pandas as pd\n",
    "    \n",
    "    print(\"loading records...\")\n",
    "    df = pd.read_hdf(input_path, key=\"clean\")\n",
    "    print(\"records loaded\")\n",
    "    \n",
    "    print(\"Loading tld info....\")\n",
    "    tld_df = pd.read_hdf(tld_info_path, key=\"tld\")\n",
    "    print(\"Loaded tld info\")\n",
    "    \n",
    "    \n",
    "    ## Note: \"Lightweight\" Python Fns mean helper code must be inside the fn. (Bad Form)\n",
    "    def extract_links(body):\n",
    "        link_regex_str = r'(http(|s)://(.*?))([\\s\\n]|$)'\n",
    "        itr = re.finditer(link_regex_str, body, re.MULTILINE)\n",
    "        return list(map(lambda elem: elem.group(1), itr))\n",
    "\n",
    "    def extract_domains(links):\n",
    "        from urllib.parse import urlparse\n",
    "        def extract_domain(link):\n",
    "            try:\n",
    "                nloc = urlparse(link).netloc\n",
    "                # We want to drop www and any extra spaces wtf nloc on the spaces.\n",
    "                regex_str = r'^(www\\.|)(.*?)\\s*$'\n",
    "                match = re.search(regex_str, nloc)\n",
    "                return match.group(2)\n",
    "            except:\n",
    "                return None\n",
    "        return list(map(extract_domain, links))\n",
    "\n",
    "    def contains_python_stack_trace(body):\n",
    "        return \"Traceback (most recent call last)\" in body\n",
    "\n",
    "    def contains_probably_java_stack_trace(body):\n",
    "        # Look for something based on regex\n",
    "        # Tried https://stackoverflow.com/questions/20609134/regular-expression-optional-multiline-java-stacktrace - more msg looking\n",
    "        # Tried https://stackoverflow.com/questions/3814327/regular-expression-to-parse-a-log-file-and-find-stacktraces\n",
    "        # Yes the compile is per call, but it's cached so w/e\n",
    "        import re\n",
    "        stack_regex_str = r'^\\s*(.+Exception.*):\\n(.*\\n){0,3}?(\\s+at\\s+.*\\(.*\\))+'\n",
    "        match = re.search(stack_regex_str, body, re.MULTILINE)\n",
    "        return match is not None\n",
    "\n",
    "    def contains_exception_in_task(body):\n",
    "        # Look for a line along the lines of ERROR Executor: Exception in task\n",
    "        return \"ERROR Executor: Exception in task\" in body\n",
    "\n",
    "    print(df.shape)\n",
    "    df['links'] = df['body'].apply(extract_links)\n",
    "    df['containsPythonStackTrace'] = df['body'].apply(contains_python_stack_trace)\n",
    "    df['containsJavaStackTrace'] = df['body'].apply(contains_probably_java_stack_trace)\n",
    "    df['containsExceptionInTaskBody'] = df['body'].apply(contains_exception_in_task)\n",
    "\n",
    "    #tag::local_mailing_list_feature_prep_fun[]\n",
    "    df['domains'] = df['links'].apply(extract_domains)\n",
    "    df['isThreadStart'] = df['depth'] == '0'\n",
    "    \n",
    "    # Arguably, you could split building the dataset away from the actual witchcraft.\n",
    "    from sklearn.feature_extraction.text import TfidfVectorizer\n",
    "\n",
    "    bodyV = TfidfVectorizer()\n",
    "    bodyFeatures = bodyV.fit_transform(df['body'])\n",
    "\n",
    "    domainV = TfidfVectorizer()\n",
    "\n",
    "    ## A couple of \"None\" domains really screwed the pooch on this one.Also, no lists just space seperated domains.\n",
    "    def makeDomainsAList(d):\n",
    "        return ' '.join([a for a in d if not a is None])\n",
    "\n",
    "    domainFeatures = domainV.fit_transform(df['domains'].apply(makeDomainsAList))\n",
    "\n",
    "    from scipy.sparse import csr_matrix, hstack\n",
    "\n",
    "    data = hstack([csr_matrix(df[['containsPythonStackTrace',\n",
    "                                  'containsJavaStackTrace',\n",
    "                                  'containsExceptionInTaskBody', \n",
    "                                  'isThreadStart']].to_numpy()),\n",
    "                                 bodyFeatures,\n",
    "                                domainFeatures])\n",
    "    #end::local_mailing_list_feature_prep_fun[]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "### The Kubeflow Bit.\n",
    "\n",
    "Now we can put these two pieces together into a pipeline. Since the data is relatively small we will use a persistent volume put them together. Later on we can add training to this pipeline as well.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Make a volume example. We redo it inside of the pipeline definition because we need to be inside\n",
    "#tag::makeVolume[]\n",
    "dvop = dsl.VolumeOp(\n",
    "    name=\"create_pvc\",\n",
    "    resource_name=\"my-pvc-2\",\n",
    "    size=\"5Gi\",\n",
    "    modes=dsl.VOLUME_MODE_RWO)\n",
    "#end::makeVolume[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!rm local-data-prep-2.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::makePipeline[]\n",
    "@kfp.dsl.pipeline(\n",
    "  name='Simple1',\n",
    "  description='Simple1'\n",
    ")\n",
    "def my_pipeline_mini(year: int):\n",
    "    dvop = dsl.VolumeOp(\n",
    "        name=\"create_pvc\",\n",
    "        resource_name=\"my-pvc-2\",\n",
    "        size=\"5Gi\",\n",
    "        modes=dsl.VOLUME_MODE_RWO)\n",
    "    tldvop = dsl.VolumeOp(\n",
    "        name=\"create_pvc\",\n",
    "        resource_name=\"tld-volume-2\",\n",
    "        size=\"100Mi\",\n",
    "        modes=dsl.VOLUME_MODE_RWO)\n",
    "    download_data_op = kfp.components.func_to_container_op(\n",
    "        download_data,\n",
    "        packages_to_install=['lxml', 'requests'])\n",
    "    download_tld_info_op = kfp.components.func_to_container_op(\n",
    "        download_tld_data,\n",
    "        packages_to_install=['requests', 'pandas>=0.24', 'tables'])\n",
    "    clean_data_op = kfp.components.func_to_container_op(\n",
    "        clean_data,\n",
    "        packages_to_install=['pandas>=0.24', 'tables'])\n",
    "\n",
    "    step1 = download_data_op(year).add_pvolumes({\"/data_processing\": dvop.volume})\n",
    "    step2 = clean_data_op(input_path=step1.output).add_pvolumes({\"/data_processing\": dvop.volume})\n",
    "    step3 = download_tld_info_op().add_pvolumes({\"/tld_info\": tldvop.volume})\n",
    "\n",
    "kfp.compiler.Compiler().compile(my_pipeline_mini, 'local-data-prep-2.zip')\n",
    "#end::makePipeline[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!rm *.zip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::longPipeline[]\n",
    "@kfp.dsl.pipeline(\n",
    "  name='Simple1',\n",
    "  description='Simple1'\n",
    ")\n",
    "def my_pipeline2(year: int):\n",
    "    dvop = dsl.VolumeOp(\n",
    "        name=\"create_pvc\",\n",
    "        resource_name=\"my-pvc-2\",\n",
    "        size=\"5Gi\",\n",
    "        modes=dsl.VOLUME_MODE_RWO)\n",
    "    tldvop = dsl.VolumeOp(\n",
    "        name=\"create_pvc\",\n",
    "        resource_name=\"tld-volume-2\",\n",
    "        size=\"100Mi\",\n",
    "        modes=dsl.VOLUME_MODE_RWO)\n",
    "\n",
    "    download_data_op = kfp.components.func_to_container_op(\n",
    "        download_data,\n",
    "        packages_to_install=['lxml', 'requests'])\n",
    "    download_tld_info_op = kfp.components.func_to_container_op(\n",
    "        download_tld_data,\n",
    "        packages_to_install=['requests', 'pandas>=0.24', 'tables'])\n",
    "    clean_data_op = kfp.components.func_to_container_op(\n",
    "        clean_data,\n",
    "        packages_to_install=['pandas>=0.24', 'tables'])\n",
    "#tag::add_feature_step[]\n",
    "    prepare_features_op = kfp.components.func_to_container_op(\n",
    "        prepare_features,\n",
    "        packages_to_install=['pandas>=0.24', 'tables', 'scikit-learn'])\n",
    "#tag::end_feature_step[]\n",
    "\n",
    "    step1 = download_data_op(year).add_pvolumes({\"/data_processing\": dvop.volume})\n",
    "    step2 = clean_data_op(input_path=step1.output).add_pvolumes({\"/data_processing\": dvop.volume})\n",
    "    step3 = download_tld_info_op().add_pvolumes({\"/tld_info\": tldvop.volume})\n",
    "    step4 = prepare_features_op(input_path=step2.output, tld_info_path=step3.output).add_pvolumes({\n",
    "        \"/data_processing\": dvop.volume,\n",
    "        \"/tld_info\": tldvop.volume})\n",
    "#end::longPipeline[]\n",
    "\n",
    "kfp.compiler.Compiler().compile(my_pipeline2, 'local-data-and-feature-prep-2.zip')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "my_experiment = client.create_experiment(name='local-data-prep-test-2')\n",
    "my_run = client.run_pipeline(my_experiment.id, 'local-data-prep', \n",
    "  'local-data-and-feature-prep-2.zip', params={'year': '2019'})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If we were using Spamassasin or some other library installed in a different base container we would:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Clean data with custom container\n",
    "#tag::cleanDataWithContainer[]\n",
    "clean_data_op = kfp.components.func_to_container_op(\n",
    "    clean_data,\n",
    "    base_image=\"{0}/kubeflow/spammassisan\".format(container_registry),\n",
    "    packages_to_install=['pandas>=0.24', 'tables'])\n",
    "#end::cleanDataWithContainer[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_func(input_path: String):\n",
    "    from sklearn.cluster import KMeans\n",
    "    from sklearn.model_selection import train_test_split\n",
    "\n",
    "    train, test = train_test_split(data, test_size=0.1)\n",
    "\n",
    "    kmeans = KMeans(n_clusters=2, random_state=42).fit(train)\n",
    "    train_pred = kmeans.predict(train)\n",
    "    test_pred = kmeans.predict(test)\n",
    "    print(test_pred)\n",
    "    # TODO: Dump the model somewhere you can use it later. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And just like that, we've done it. We've created a Kubeflow Pipeline.\n",
    "\n",
    "So let's take a moment to step back and think, \"what in the crazy-town-heck is going on here?!\".  A valid question, and well spotted.  Each \"Step\" is going to be creating a container.  Maybe I should have noted that earlier when talking about attatching volumes, beacuse if you thougth I was doing that to a function, you'd probably think me quite insane. \n",
    "\n",
    "But, if you follow this code, and create this pipeline, download it and run it, you will see each \"step\" as a seperate container, downloading data, saving it to a `PVC` then passing some parameters to a next container, which also will load the `PVC`, etc. etc.  \n",
    "\n",
    "### Using Python to Create Containers, but not like a crazy person\n",
    "\n",
    "For completeness, let's last explore how to do all of these things using annotations. \n",
    "\n",
    "The trick for the most part is to create a function that returns a `kfp.dsl.ContainerOp`.  This will point to an image, note the volumes that need to be mounted, and a number of other things. I've heard told people don't always just like creating absurdly large and fat functions to do everything in real life, so I leave this hear as an aside in case the reader is interested in it.  It's alsow worth noting that adding the `@kfp.dsl.component` annotation instructs teh Kubeflow compiler to turn on static typce checking. \n",
    "\n",
    "```\n",
    "@kfp.dsl.component\n",
    "def my_component(my_param):\n",
    "  ...\n",
    "  return kfp.dsl.ContainerOp(\n",
    "    name='My component name',\n",
    "    image='gcr.io/path/to/container/image'\n",
    "  )\n",
    "```\n",
    "\n",
    "Finally, when it comes to incorporating these components into pipelines, you would do something like this:\n",
    "\n",
    "```\n",
    "@kfp.dsl.pipeline(\n",
    "  name='My pipeline',\n",
    "  description='My machine learning pipeline'\n",
    ")\n",
    "def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):\n",
    "  my_step = my_component(my_param='a')\n",
    "```\n",
    "\n",
    "Which should look exceedingly familiar as we did something very similar with our `download_data_fn` and `witchcraft_fn`.  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
